package com.shujia.opt

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Random

object DoubleReduce {

  /**
   * 双重聚合
   * 一般适用于  业务不复杂的情况
   *
   */
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("app")
    val sc: SparkContext = new SparkContext(conf)
    val lines: RDD[String] = sc.textFile("spark/data/word")

    val wordRDD: RDD[String] = lines
      .flatMap(_.split(","))
      .filter(!_.equals(""))

    // 通过抽样找到会照成数据倾斜的key 打上5以内随机前缀

    val top1: Array[(String, Int)] = wordRDD
      .sample(withReplacement = true, 0.1)
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(-_._2)
      .take(1)

    //导致数据倾斜额key
    val key: String = top1(0)._1
    println("导致数据倾斜的key:" + key)

    wordRDD.map(word => {
      if (key.equals(word)) {
        val pix: Int = Random.nextInt(5)
        (pix + "-" + word, 1)
      } else {
        (word, 1)
      }
    })
      .groupByKey() //第一次聚合
      .map(t => (t._1, t._2.toList.sum))
      .map(t => {
        // 去掉随机前缀
        if (t._1.contains("-")) {
          (t._1.split("-")(1), t._2)
        } else {
          t
        }
      })
      .groupByKey() //第二次聚合
      .map(t => (t._1, t._2.toList.sum))
      .foreach(println)

    while (true) {

    }

  }
}
